import io.openmessaging.BytesMessage;
import io.openmessaging.demo.MessageUtils2;
import io.openmessaging.demo.MiscUtils;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Author :  Rocky
 * Date : 27/05/2017 22:59
 * Description :
 * Test :
 */
public class MultiThreadProduceSingleThreadConsumTest {


    public static void main(String[] args) throws InterruptedException, IOException {

        long s = System.currentTimeMillis();

        int producerCount = 10;

        int msgCount = 10000000;

        ExecutorService producerThreadPool = Executors.newFixedThreadPool(producerCount);
        LinkedBlockingQueue<BytesMessage> messages = new LinkedBlockingQueue<>();
        CountDownLatch latch1 = new CountDownLatch(msgCount);

        for (int i = 0; i < producerCount; i++) {
            producerThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000000; j++) {
                        try {
                            BytesMessage msg = MiscUtils.buildMsg();
                            if (!messages.offer(msg)) {
                                System.out.println("队列满");
                                messages.put(msg);
                            }
                            latch1.countDown();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    latch1.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("生产者完成，耗时 " + (System.currentTimeMillis() - s) + " ms");
            }
        }).start();


        RandomAccessFile raf = new RandomAccessFile("testfile", "rw");
        MappedByteBuffer mbb = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, Integer.MAX_VALUE);
        CountDownLatch latch = new CountDownLatch(msgCount);
        ExecutorService consumerThreadPool = Executors.newFixedThreadPool(1);
        long s1 = System.currentTimeMillis();
        consumerThreadPool.submit(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        BytesMessage msg;
                        if ((msg = messages.poll()) == null) {
                            System.out.println("队列空");
                            msg = messages.take();
                        }
                        MessageUtils2.writeMessage(msg, mbb);
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        latch.await();

        long e = System.currentTimeMillis();
        System.out.println("消费者完成，耗时 " + (e - s1) + " ms");


        System.out.println("总耗时 " + (e - s) + " ms");
    }

}
